package com.ww.flink

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
 * 1、每隔10s 计算最近Ns数据的wordcount
 */
object Flink_try13_count_window {
  case class CarInfo(carId: String, speed: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val initStream: DataStream[String] = env.socketTextStream("node01", 8888)
    val wordStream = initStream.flatMap(_.split(" "))
    val pairStream = wordStream.map((_, 1))
    //val keyByStream = pairStream.keyBy(_._1)

    pairStream.countWindowAll(5,2)
    .reduce(new ReduceFunction[(String, Int)] {
      override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
        println("----")
        (value1._1 + "," + value2._1, value1._2 + value2._2)
      }
    }).print()
    env.execute()


  }


}
